-
Couldn't load subscription status.
- Fork 152
Session API #789
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Session API #789
Conversation
6ea1621 to
2f9bbee
Compare
94ec7d0 to
e5caee2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this generally looks good - lmk when the API is considered final
aa93417 to
212035c
Compare
51915ab to
0c89008
Compare
c52f944 to
9b16217
Compare
|
|
|
The state machines: Simulator.Screen.Recording.-.iPhone.17.-.2025-10-23.at.12.10.15.mov |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work on this @pblazej, looking forward to seeing this get merged!
|
Did one more fix - I noticed that there was a "gap" that we discussed in session vs agent preconnect states: a787606 Now it's kinda more obvious with |
f9e3fbc to
3930acb
Compare
3930acb to
7a71453
Compare
|
|
||
| private enum State { | ||
| case disconnected | ||
| case connecting(buffering: Bool) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's equivalent to JS isBufferingSpeech so pre-connect buffer, it's not exposed anywhere, but we can rename it
| mutating func connecting(buffering: Bool) { | ||
| log("Agent connecting from \(state)") | ||
| switch state { | ||
| case .disconnected, .connecting: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's no .reconnecting state in the agent itself, this overall Agent.State is a little artificial as it's derived partially from the room, as mentioned above: #789 (comment)
| mutating func connected(participant: Participant) { | ||
| log("Agent connected to \(participant) from \(state)") | ||
| switch state { | ||
| case .connecting, .connected: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the perspective of UI framework, it does not really matter (for perf) https://medium.com/airbnb-engineering/understanding-and-improving-swiftui-performance-37b77ac61896 as we use the non-equatable (default) comparison for the State struct.
|
|
||
| init(room: Room) { | ||
| self.room = room | ||
| room.add(delegate: self) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Room does not keep a strong ref to the delegates
| let delegates = NSHashTable<AnyObject>.weakObjects() |
| /// Creates a new message stream for the transcription delegate receiver. | ||
| func messages() -> AsyncStream<ReceivedMessage> { | ||
| let (stream, continuation) = AsyncStream.makeStream(of: ReceivedMessage.self) | ||
| self.continuation = continuation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this messages() will be called multiple times?
If yes, the current code stores a single continuation, calling messages() again overwrites it and could it leave the old stream hanging ?
In that case, I am a bit worrying that it never finish the stream either, so consumers can hang ?
And I wonder if we should have an explicit stop function like
func stop() {
room?.remove(delegate: self)
room = nil
continuation?.finish()
continuation = nil
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a general problem with AsyncStream<> being exposed here (without storing anything internally). It's not intended to be used by multiple consumers at all (known issue). There's also no equivalent AnyAsyncSequence (as AnyPublisher).
it never finish the stream either,
It can be cancelled from the outside like this:
let locations = AsyncLocationStream()
let task = Task {
for await location in locations.stream {
print(location)
}
}
task.cancel()There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re: stream idempotence, I think we've got 2 choices:
- leave is as is - it won't register another consumer for this topic, just
throwingStreamError.handlerAlreadyRegistered - unregister before registering, so the previous stream will stop working - I think that's the worst one
Sources/LiveKit/Agent/Session.swift
Outdated
| private func observe(receivers: [any MessageReceiver]) { | ||
| for receiver in receivers { | ||
| Task { [weak self] in | ||
| for await message in try await receiver.messages() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any chance that messages() will throw ?
If that is possible, this loop just exits quietly.
How about wrapping in do/catch and log so users know why a stream stopped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
6846414 yeah I think it's doable
| Task { [weak self] in | ||
| for try await _ in room.changes { | ||
| guard let self else { return } | ||
| updateAgent(in: room) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the room is immutable public let room: Room, I don't see a reason for cancelling on disconnect while we wanna observe connection state as well: connectionState = room.connectionState
| for try await _ in localParticipant.changes { | ||
| guard let self else { return } | ||
|
|
||
| microphoneTrack = localParticipant.firstAudioTrack |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This localMedia class is @mainactor,
I think you should run code on the main thread:
await MainActor.run {
self.microphoneTrack = localParticipant.firstAudioTrack
self.cameraTrack = localParticipant.firstCameraVideoTrack
self.screenShareTrack = localParticipant.firstScreenShareVideoTrack
self.isMicrophoneEnabled = localParticipant.isMicrophoneEnabled()
self.isCameraEnabled = localParticipant.isCameraEnabled()
self.isScreenShareEnabled = localParticipant.isScreenShareEnabled()
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| } | ||
| } | ||
|
|
||
| Task { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as these code touches the self properties, use @mainactor [weak self] in
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above #789 (comment)
|
|
||
| guard let cameraCapturer = getCameraCapturer() else { return } | ||
| let captureOptions = CameraCaptureOptions(device: videoDevice) | ||
| _ = try? await cameraCapturer.set(options: captureOptions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this cameraCaptureer.set fails, should we still set the selectedVideoDeviceID ?
How about
guard let capturer = getCameraCapturer() else { return }
do {
try await capturer.set(options: .init(device: videoDevice))
await MainActor.run {
self.selectedVideoDeviceID = videoDevice.uniqueID
}
} catch {
self.error = .mediaDevice(error)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep 99e74b8
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW there's an issue to improve this API overall #177
|
looks good. I have some questions / comments, please address them. |
|
@xianshijing-lk thank you for looking into that! I think I added missing error handlers. The only thing that is a tradeoff and cannot be easily fixed is the I did not want to revert it to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm.

Adds 3 basic building blocks for simple(r) agent experiences:
Session- connection, pre-connect, agent dispatch, agent filtering (e.g. by name), all agents, messages (broadcasted and aggregated for now)Agent- wrapper aroundParticipant, knows its tracks and internal stateLocalMedia- (unrelated) helper to deal with local tracks in SwiftUIExample: livekit-examples/agent-starter-swift#29